
cluster_thresh = 0.5

for(year in seq(2008,2020,2)){
  print(glue('Working in {year}...'))
  
  addr_matches = list.dirs('data/fec_dedupe_matches', full.names = T, recursive = F) %>%
    .[grepl(glue('{year}_a'), .)] %>%
    lapply(open_dataset) %>%
    open_dataset() %>%
    filter(gamma_contbr_addr > 0 | gamma_contbr_addr == -1, match_probability >= cluster_thresh) %>%
    select(emmid_l, emmid_r) %>%
    collect() %>%
    as.data.table()
  
  po_matches = list.dirs('data/fec_dedupe_matches', full.names = T, recursive = F) %>%
    .[grepl(glue('{year}_p'), .)] %>%
    lapply(open_dataset) %>%
    open_dataset() %>%
    filter(gamma_po_num > 0 | gamma_po_num == -1, match_probability >= cluster_thresh) %>%
    select(emmid_l, emmid_r) %>%
    collect() %>%
    as.data.table()
  
  this_graph = list(addr_matches, po_matches) %>%
    rbindlist() %>%
    as.matrix() %>%
    graph_from_edgelist(directed = F)
  
  comps = components(this_graph)
  cluster_names = data.table(
    year_cluster = str_c(glue('f-{substr(year,3,4)}-'), comps$membership),
    emmid = names(comps$membership)
  )
  
  setkey(cluster_names, year_cluster)
  
  match_obsv = open_dataset(glue('data/fec_clean/cycle_table={year}/incl_in_dedupe=1')) %>%
    select(emmid, exact_person_id) %>%
    collect() %>%
    as.data.table()
  setkey(match_obsv, emmid)
  
  out = match_obsv[cluster_names, on = 'emmid'][, .(exact_person_id, year_cluster)]
  out = unique(out)
  setkey(out, exact_person_id)
  
  all_year = open_dataset(glue('data/fec_clean/cycle_table={year}')) %>%
    collect() %>%
    as.data.table()
  
  setkey(all_year, exact_person_id)
  
  all_year = out[all_year, on = 'exact_person_id']
  
  # need the zero, otherwise it joins cluster 1 and exact person id 1
  all_year[, year_cluster := fifelse(is.na(year_cluster), str_c(glue('f-{substr(year,3,4)}-0'), exact_person_id), year_cluster)]
  
  all_year[, this_exact_count := .N, by = exact_person_id]
  
  setorder(all_year, -this_exact_count, -contb_receipt_dt)
  
  all_year[, year_cluster_best := fifelse(seq_len(.N) == 1 & this_exact_count == max(this_exact_count), 1L, 0L), by = year_cluster]
  all_year[, this_exact_count := NULL]
  
  write_dataset(
    all_year,
    glue('data/fec_across/cycle_table={year}'),
    partitioning = 'year_cluster_best',
    max_rows_per_file = 5e5
  )
  
}
